/**
 * Copyright (C) 2010-2013 Alibaba Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.rocketmq.broker.client;

import io.netty.channel.Channel;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.common.constant.LoggerName;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.common.RemotingUtil;

/**
 * 管理Producer组及各个Producer连接
 *
 * @author shijia.wxr<vintage.wang@gmail.com>
 * @since 2013-7-26
 */
public class ProducerManager {
	private static final Logger log = LoggerFactory.getLogger(LoggerName.BrokerLoggerName);
	private static final long LockTimeoutMillis = 3000;
	private static final long ChannelExpiredTimeout = 1000 * 120;
	private final Lock groupChannelLock = new ReentrantLock();
	private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable = new HashMap<String, HashMap<Channel, ClientChannelInfo>>();

	public ProducerManager() {
	}

	public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
		HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable = new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
		try {
			if (this.groupChannelLock.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
				try {
					newGroupChannelTable.putAll(groupChannelTable);
				} finally {
					groupChannelLock.unlock();
				}
			}
		} catch (InterruptedException e) {
			log.error("", e);
		}
		return newGroupChannelTable;
	}

	/**
	 * Producer的链接。检查ProducerManager.groupChannelTable：HashMap<String, HashMap<Channel, ClientChannelInfo>>变量，查看每个ClientChannelInfo的lastUpdateTimestamp距离现在是否已经超过了120秒，若是则从该变量中删除此链接。
	 */
	public void scanNotActiveChannel() {
		try {
			if (this.groupChannelLock.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
				try {
					for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {
						final String group = entry.getKey();
						final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();

						Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
						while (it.hasNext()) {
							Entry<Channel, ClientChannelInfo> item = it.next();
							// final Integer id = item.getKey();
							final ClientChannelInfo info = item.getValue();

							long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
							if (diff > ChannelExpiredTimeout) {
								it.remove();
								log.warn("SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}", RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
								RemotingUtil.closeChannel(info.getChannel());
							}
						}
					}
				} finally {
					this.groupChannelLock.unlock();
				}
			} else {
				log.warn("ProducerManager scanNotActiveChannel lock timeout");
			}
		} catch (InterruptedException e) {
			log.error("", e);
		}
	}

	public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
		if (channel != null) {
			try {
				if (this.groupChannelLock.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
					try {
						for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {
							final String group = entry.getKey();
							final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable = entry.getValue();
							final ClientChannelInfo clientChannelInfo = clientChannelInfoTable.remove(channel);
							if (clientChannelInfo != null) {
								log.info("NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}", clientChannelInfo.toString(), remoteAddr, group);
							}

						}
					} finally {
						this.groupChannelLock.unlock();
					}
				} else {
					log.warn("ProducerManager doChannelCloseEvent lock timeout");
				}
			} catch (InterruptedException e) {
				log.error("", e);
			}
		}
	}

	public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
		try {
			ClientChannelInfo clientChannelInfoFound = null;

			if (this.groupChannelLock.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
				try {
					HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
					if (null == channelTable) {
						// 若没有获取到，则创建一个新的HashMap<Channel,ClientChannelInfo>作为values值，并以groupName值为key值存入groupChannelTable遍历中
						channelTable = new HashMap<Channel, ClientChannelInfo>();
						this.groupChannelTable.put(group, channelTable);
					}
					// 以Chanel为key值从上面的values值中获取ClientChannelInfo对象
					clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
					if (null == clientChannelInfoFound) {
						// 若该对象为空，则将该Channel和请求参数clientChannelInfo添加到此values值中。
						channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
						log.info("new producer connected, group: {} channel: {}", group, clientChannelInfo.toString());
					}
				} finally {
					this.groupChannelLock.unlock();
				}

				if (clientChannelInfoFound != null) {
					// 更新ClientChannelInfo对象的时间戳
					clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
				}
			} else {
				log.warn("ProducerManager registerProducer lock timeout");
			}
		} catch (InterruptedException e) {
			log.error("", e);
		}
	}

	public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
		try {
			if (this.groupChannelLock.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
				try {
					HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
					if (null != channelTable && !channelTable.isEmpty()) {
						ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
						if (old != null) {
							log.info("unregister a producer[{}] from groupChannelTable {}", group, clientChannelInfo.toString());
						}

						if (channelTable.isEmpty()) {
							this.groupChannelTable.remove(group);
							log.info("unregister a producer group[{}] from groupChannelTable", group);
						}
					}
				} finally {
					this.groupChannelLock.unlock();
				}
			} else {
				log.warn("ProducerManager unregisterProducer lock timeout");
			}
		} catch (InterruptedException e) {
			log.error("", e);
		}
	}
}
